package com.echatim.service.app;

import com.alibaba.fastjson.JSONObject;
import com.annotation.MethodFor;
import com.annotation.TopicFor;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.broker.base.event.ClusterDispatcherEvent;
import com.broker.base.protocol.response.Resp;
import com.commom.AppRespError;
import com.commom.DBConst;
import com.commom.Topic;
import com.echatim.ApplicationWrapper;
import com.echatim.broker.localsvc.TopicSender;
import com.echatim.entity.*;
import com.echatim.form.MessageSendForm;
import com.echatim.mapper.MessageMapper;
import com.utils.Beans;
import com.utils.Streams;
import com.utils.UIDUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.validation.Valid;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @author kong <androidsimu@163.com>
 * create by 2019/2/21 13:21
 * Description: echatim
 **/


@Slf4j
@ConditionalOnProperty(name="echatim.sdk.auth-type", havingValue="community")
@TopicFor(value = Topic.APP_UPSTREAM_MESSAGE.name)
@DS("master")
@Service
public class MessageService extends ServiceImpl<MessageMapper, Message>  {
//    @Autowired
//    TopicSender topicSender;
    @Autowired
    private MessageStableOfflineService messageStableOfflineService;
    @Autowired
    private UserService userService;
    @Autowired
    private RoomService roomService;
    @Autowired
    private RoomUserService roomUserService;
    @Value("${echatim.sdk.stress-test}")
    private Boolean stressTest = false;

    @Transactional
    @MethodFor(value = Topic.APP_UPSTREAM_MESSAGE.METHOD.SEND, consumer = MessageSendForm.class)
    public Resp<String> send(@Valid MessageSendForm sendForm){

        if(!Arrays.asList(
                DBConst.MessageWay.P2P.name(),
                DBConst.MessageWay.P2R.name(),
                DBConst.MessageWay.P2LR.name()
                ).contains(sendForm.getWay())){
            return Resp.failed(AppRespError.UNKNOWN_MESSAGE_WAY);
        }
        int fromUserCnt = userService.lambdaQuery().eq(User::getAuid, sendForm.getFromUser()).count();
        if(fromUserCnt == 0){
            return Resp.failed(String.format("发送用户:%s不存在.", sendForm.getFromUser()), AppRespError.FROM_AUID_NOT_EXIST.getCode());
        }

        Message msg = Beans.copy(sendForm, Message.class)
                .setUid(UIDUtils.gen())
                .setAppKey(sendForm.getUserJwt().getAppKey())
                .setBody(sendForm.getBody().toString())
                .setFromUser(sendForm.getFromUser())
                .setStatus(DBConst.EntryStatus.NORMAL.name())
                .setCreateTime(new Date());
        this.save(msg);

        // 创建Client转发消息，交给Client转发组件来路由
        ClusterDispatcherEvent dispatcherMsg = new ClusterDispatcherEvent();
        if(msg.getWay().equals(DBConst.MessageWay.P2P.name())){
            int cnt = userService.lambdaQuery().eq(User::getAuid, sendForm.getToTarget()).count();
            if(cnt == 0){
                return Resp.failed(String.format("目标用户:%s不存在.", sendForm.getToTarget()), AppRespError.TO_AUID_NOT_EXIST.getCode());
            }

            MessageStableOffline messageStableOffline = Beans.copy(sendForm, MessageStableOffline.class)
                    .setUid(UIDUtils.gen())
                    .setMid(msg.getUid())
                    .setBody(sendForm.getBody().toString())
                    .setFromUser(sendForm.getFromUser())
                    .setToUser(sendForm.getToTarget())
                    .setWay(msg.getWay())
                    .setAppKey(sendForm.getUserJwt().getAppKey())
                    .setStatus(DBConst.MessageStableOfflineStatus.OFFLINE.name())
                    .setCreateTime(new Date());
            messageStableOfflineService.save(messageStableOffline);

        }
        else if(msg.getWay().equals(DBConst.MessageWay.P2R.name())){
            Room room = roomService.lambdaQuery().eq(Room::getId, sendForm.getToTarget()).one();
            if(room == null){
                return Resp.failed("IM群不存在:rid=" + sendForm.getToTarget(), AppRespError.ROOM_NOT_EXIST.getCode());
            }
            // 查找群的所有成员
            List<RoomUser> roomUsers = roomUserService.lambdaQuery().eq(RoomUser::getRid, room.getId()).list();
            roomUsers = roomUsers.stream().filter(v->!v.getAuid().equals(msg.getFromUser())).collect(Collectors.toList()); // 排除自己
            List<MessageStableOffline> messageStableOfflines = Streams.map(roomUsers, roomUser -> {
                return Beans.copy(sendForm, MessageStableOffline.class)
                        .setUid(UIDUtils.gen())
                        .setMid(msg.getUid())
                        .setRid(room.getId())
                        .setBody(sendForm.getBody().toString())
                        .setFromUser(sendForm.getFromUser())
                        .setToUser(roomUser.getAuid()) // 用户的auid
                        .setWay(msg.getWay())
                        .setAppKey(sendForm.getUserJwt().getAppKey())
                        .setStatus(DBConst.MessageStableOfflineStatus.OFFLINE.name())
                        .setCreateTime(new Date());
            });

            messageStableOfflineService.saveBatch(messageStableOfflines);

            dispatcherMsg.setRoomMembers(roomUsers.stream()
                    .map(v->v.getAuid())
                    .collect(Collectors.toList()));
        }



        dispatcherMsg.setMid(msg.getUid()+"");
        dispatcherMsg.setWay(msg.getWay());
        dispatcherMsg.setType(sendForm.getType());
        dispatcherMsg.setFrom(sendForm.getFromUser());
        dispatcherMsg.setTo(sendForm.getToTarget());
        dispatcherMsg.setMsgBody(modifyMessageBody(sendForm.getBody()));
        dispatcherMsg.setAppKey(sendForm.getUserJwt().getAppKey());

        TopicSender topicSender = (TopicSender)ApplicationWrapper.getContext().getBean("topicSender");
        if(topicSender != null && topicSender.getSender() != null){
            topicSender.getSender().send(dispatcherMsg);
        }
        return Resp.ok("发送成功");
    }

    private JSONObject modifyMessageBody(JSONObject modifyBody){
        // 压力测试下增加serverTime字段, 记录服务器的处理时间
        if(stressTest){
            modifyBody.put("serverTime", new Date().getTime());
        }
        return modifyBody;
    }

    public Resp<String> getMessageSessions(MessageSendForm sendForm) {
        return Resp.ok(null);
    }
}
